iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 26
0
Modern Web

Angular新手村學習筆記(2019)系列 第 26

Day26_RxJS Custom Operators(1/3)

  • 分享至 

  • xImage
  •  

後來在youtube看到ng conf的影片,覺得有相關
所以RxJS Custom Operators分成3天PO
明天、後天分別是:

延申閱讀
ng conf 2018
Use the Custom Operator Force; Become an RxJS Jedi - Ryan Chenkie
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s
https://github.com/chenkie?tab=repositories
ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51

[S05E08]VSCode基本介紹
https://www.youtube.com/watch?v=kEllE8sZtuE&list=PL9LUW6O9WZqgUMHwDsKQf3prtqVvjGZ6S&index=15
VSCode與測試無關,就跳過囉
文件非常完整,基本上取自己需要的部分設定即可
https://hackmd.io/3uNcseF9TfeiEa0q-VDQXQ

本集的程式碼
https://github.com/chenkie/custom-operators-workshop

這一集由Kevin大大講解
[S05E09] RxJS Custom Operators
https://www.youtube.com/watch?v=HdyTkaD46ls&list=PL9LUW6O9WZqgUMHwDsKQf3prtqVvjGZ6S&index=14

這一集只有14分鐘,
就是講2篇,Kevin大大自己在blog裡寫的文章
雖說只有2篇文章,不過感覺是比較進階的內容了

可見其他文章都非常值得看(只是沒人講解可能深了一點)
https://blog.kevinyang.net/

Keive大大的程式
https://stackblitz.com/edit/typescript-g6f86y

以下就針對index.ts裡,14分鐘有講到的部分註解
(整段copy出來也是避免以後程式不見,還能參考)

import { from, fromEvent, timer, interval, Observable, of } from 'rxjs';
import { tap, take, mergeMap, bufferTime, map, throttleTime, windowTime, sampleTime, auditTime, toArray } from 'rxjs/operators';

console.clear();


/*========================================================
練習各種跟時間有關的 operators
bufferTime
throttleTime
windowTime
sampleTime
auditTime
==========================================================*/
// fromEvent(document, 'click').pipe(
//   tap(event => console.log('tap:', event)),
//   windowTime(2000),
//   mergeMap(event => {
//     console.log('-----');
//     console.log(event);
//     console.log('=====');
//     return event.pipe(toArray(), tap(v => console.log('o:', v)));
//   }),
//   take(5),
// ).subscribe();


/* ========================================================
 平均陣列的值,建立 avg operators
===========================================================*/
// const avg = map(data => {
//   if (!Array.isArray(data)) return data;
//   if (data.length === 0) return '0';
//   return (data.reduce((acc, curr) => acc + curr, 0) / data.length);
// });

// interval(1000).pipe(
//   mergeMap(v => from(createRamdonSizeArray())),
//   bufferTime(800),
//   avg,
//   take(10),
// ).subscribe(console.log);


// function createRamdonSizeArray() {
//   return Array.from({ length: Math.floor(Math.random() * 10) + 3 }).map(v => Math.floor(Math.random() * 10) + 1);
// }

/* ========================================================
 建立自訂 operators 的方法
 ======================================================== */
/* 方法 1 */ 如果要由外面傳資料進去,可以用function的方式。很像map()
                   VVVVV
const Multiply1 = (times) => (source: Observable<any>) =>
                              ^^^^^^^^^^^^^^^^^^^^^^^接受一個 Observable
                 $$$$$$$$
  new Observable(observer => {
  ^^^^^^^^^^^^^^回傳一個新的 Observable
    return source.subscribe({
                  ^^^^^^^^^在 subscribe 處理,next、error 和 complete
      next(value) {
        $$$$$$$$ 將相關的動作對應到上層 (new Observable()) 的 observer
        observer.next(value * times);
      },
      error(err) {
        $$$$$$$$
        observer.error(err);
      },
      complete() {
        $$$$$$$$
        observer.complete();
      }
    })
  });

// 試用
of(1,2,3).pipe(Multiply1(10)).subscribe(console.log);
               ^^^^^^^^^^^^^ 很像map()
/* 方法 2 */
const Multiply = (times) => map(value => value * times);
                            ^^^ 用map()來實作 function
                            寫一行的方式,沒法串太多operator
適合只用1個operator,很精簡,例如:filter()也適合寫成1行
或
const Multiply2 = (times) => obs => obs.pipe(
  map((value: number) => value * times)
);

/* 方法 2-1 不同的寫法 */
function Multiply2_1(times) {
  return function (obs) {
    return obs.pipe(
      map((value: number) => value * times)
    )
  }
}

/* 方法 3 */
const Multiply3 = (times) => map((value: number) => value * times);


const source$ = of(1, 2, 3);
console.log('---------方法1---------');
source$.pipe(Multiply1(2)).subscribe(console.log);
console.log('---------方法2---------');
source$.pipe(Multiply2(2)).subscribe(console.log);
console.log('---------方法2_1---------');
source$.pipe(Multiply2_1(2)).subscribe(console.log);
console.log('---------方法4---------');
source$.pipe(Multiply3(2)).subscribe(console.log);

以下是影片的程式,跟文章有一點點差異,參考用

import { Observable, of, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';

const Multiply = (source: Observable<any>)=>
    new Observable(observer => {
        return source.subscribe({
            next(value){
                observer.next(value*2);
            },
            error(err){
                observer.error(err);
            },
            complete(){
                observer.complete();
            }
        })
    });

const test = (source: Observable) => combineLatest(source, of(1)).pipe(
原則:          接受Observable          回傳Observable
    map((v1,v2)=>({v1,v2}))
)

of(1,2,3).pipe(test).subscribe(console.log);
// {v1: Array[2], v2: 0}
// v1: Array[2]
//   0:3
//   1:1
// v2: 0

載入已寫好的library、helper或utility

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
import _isNumber from 'lodash/isNumber';
      VVVVVVVV         ^^^^^^ 用lodash載入isNulber function
const isNumber = () => filter(_isNumber);
                              ^^^^^^^^^ 判斷是不是Number

const source$ = from([1, 2, '3']);

source$.pipe(
  isNumber() // 使用
).subscribe(console.log);

scheduler

Kevin大大用一點時間講他的另一篇文章
https://blog.kevinyang.net/2018/08/31/rxjs-scheduler/

自訂Observable必需要學會scheduler
因為Observalbe的底層是用scheduler來完成的

https://blog.kevinyang.net/2018/08/31/rxjs-scheduler/#dive-in
scheduler底層會把所有的動作變成work

  • work 可以被執行的單位

  • action 去執行work

  • subscription

const sub = new Subscription();
const sub1 = queueScheduler.schedule((state)=> console.log(state), 0, 42);
const sub2 = queueScheduler.schedule((state)=> console.log(state), 0, '123');
                            ^^^^^^^^ 用schedule會回傳subscription

sub.add(sub1); // 用subscription.add(),去加入其他subscription
sub.add(sub2);
sub.unsubscribe();
    ^^^^^^^^^^^ 被加進來的sub1、sub2同時也會被unsubscribe()
    
類似之前介紹的:
在Observable裡的,operator裡面加takeUntil()
再用subject去觸發什麼時候中斷

找一個簡單的Observable,對一下operator怎麼做的

  • from.ts

原始碼較新,較影片裡的from.ts的程式更精簡
https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/from.ts

import { Observable } from '../Observable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from '../scheduled/scheduled';
...
 * @see {@link fromEvent}
 * @see {@link fromEventPattern}
 *
 * @param {ObservableInput<T>} A subscription object, a Promise, an Observable-like,
 *         ^^^^^^^^^^^^^^^^^^    1111111111111111111    2222222     333333333333333
 * an Array, an iterable, or an array-like object to be converted.
 *    44444     55555555        66666666666666666
 * @param {SchedulerLike} An optional {@link SchedulerLike} on which to schedule the emission of values.
 * @return {Observable<T>}
 * @name from
 * @owner Observable
 */
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {                ^^^^^^^^^^^^^^^^^^ 上述1~6種case
  if (!scheduler) {
    if (input instanceof Observable) {
      return input;
    }
    return new Observable<T>(subscribeTo(input));
  } else {
    return scheduled(input, scheduler);
  }
}

要切換TAB到6.0.0左右的版本,才能看到跟Kevin大大一樣的程式碼

import { Observable } from '../Observable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isObservable } from '../util/isObservable';
import { isIterable } from '../util/isIterable';
import { fromArray } from './fromArray';
import { fromPromise } from './fromPromise';
import { fromIterable } from './fromIterable';
import { fromObservable } from './fromObservable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike } from '../types';

// 多型
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>;          ^^^^^^^^^^^^^^^^^^^^^^^^^
export function from<T>(input: ObservableInput<ObservableInput<T>>, scheduler?: SchedulerLike): Observable<Observable<T>>;
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
  // 如果有scheduler
  if (!scheduler) {
    if (input instanceof Observable) {
      return input;
    }
    return new Observable(subscribeTo(input));
  }
  // 進入點
  if (input != null) {
    // 分case,如果是Observable,就做xxx
    if (isObservable(input)) {
      return fromObservable(input, scheduler);
      // 如果是Promise
    } else if (isPromise(input)) {
      return fromPromise(input, scheduler);
    } else if (isArrayLike(input)) {
      return fromArray(input, scheduler);
    }  else if (isIterable(input) || typeof input === 'string') {
      return fromIterable(input, scheduler);
    }
  }
  
  throw new TypeError((input !== null && typeof input || input) + ' is not observable');
}
  • fromArray.ts

https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/fromArray.ts
新版6.5.3已經大幅改寫

import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { subscribeToArray } from '../util/subscribeToArray';
import { scheduleArray } from '../scheduled/scheduleArray';

export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
  if (!scheduler) {
    return new Observable<T>(subscribeToArray(input));
           ^^^^^^^^^^^^^^^^^
  } else {
    return scheduleArray(input, scheduler);
  }
}

要切換TAG到6.0.0版左右

import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { Subscription } from '../Subscription';
import { subscribeToArray } from '../util/subscribeToArray';

export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
  if (!scheduler) {
    return new Observable<T>(subscribeToArray(input));
  } else {                   
    return new Observable<T>(subscriber => {
           ^^^^^^^^^^^^^^^^^ 也是new Observable
      const sub = new Subscription(); // 跟Kevin大大的scheduler文章寫法很像
      // 證明Observalbe的底層是用scheduler來完成的
      let i = 0;
              vvvvvvvvv 外面傳進來的scheduler
      sub.add(scheduler.schedule(function () {
          ^^^           ^^^^^^^^去執行工作
          把scheduler add到上層sub,到時unsubscribe會全部一起unsubscribe
        if (i === input.length) {
          subscriber.complete();  // subscirbe的complete()
          return;
        }
        subscriber.next(input[i++]);  // subscribe的next()
        if (!subscriber.closed) {
          sub.add(this.schedule());
        }
      }));
      return sub; // 回傳subscription
    });
  }
}

參考 fromArray、from的寫法,就能自訂義Observable的實作
自訂Observable的實作機會較少(Kevin大大只是附代一提)
自訂operator的實作機會較大


上一篇
Day25_CaseStudy: RxJS真實案例展示
下一篇
Day27_RxJS Custom Operators(2/3)(ng conf 2018)
系列文
Angular新手村學習筆記(2019)33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言